Shuffle Process
Previously we've discussed Spark's physical plan and its execution details. But one thing is left untouched:how data gets through aShuffleDependency
to the next stage?
Shuffle Comparison between Hadoop and Spark
There're some differences and also similarities between the shuffle process in Hadoop and in Spark:
From a high-level point of view, they are similar.They both partition the mapper's (orShuffleMapTask
in Spark) output and send each partition to its corresponding reducer (in Spark, it could be aShuffleMapTask
in the next stage, or aResultTask
). The reducer buffers the data in memory, shuffles and aggregates the data, and applies thereduce()
logic once the data is aggregated.
From a low-level point of view, there're quite a few differences.The shuffle in Hadoop is sort-based since the records must be sorted beforecombine()
andreduce()
. The sort can be done by an external sort algorithm thus allowingcombine()
orreduce()
to tackle very large datasets. Currently in Spark the default shuffle process is hash-based. Usually it uses aHashMap
to aggregate the shuffle data and no sort is applied. If the data needs to be sorted, user has to callsortByKey()
explicitly. In Spark 1.1, we can set the configurationspark.shuffle.manager
tosort
to enable sort-based shuffle. In Spark 1.2, the default shuffle process will be sort-based.
Implementation-wise, there're also differences.As we know, there are obvious steps in a Hadoop workflow:map()
,spill
,merge
,shuffle
,sort
andreduce()
. Each step has a predefined responsibility and it fits the procedural programming model well. However in Spark, there're no such fixed steps, instead we have stages and a series of transformations. So operations likespill
,merge
andaggregate
need to be somehow included in the transformations.
If we name the mapper side process of partitioning and persisting data "shuffle write", and the reducer side reading and aggregating data "shuffle read". Then the problem becomes:How to integrate shuffle write and shuffle read logic in Spark's logical or physical plan? How to implement shuffle write and shuffle read efficiently?
Shuffle Write
Shuffle write is a relatively simple task if a sorted output is not required. It partitions and persists the data. The persistance of data here has two advantages: reducing heap pressure and enhancing fault-tolerance.
Its implementation is simple: add the shuffle write logic at the end ofShuffleMapStage
(in which there's aShuffleMapTask
). Each output record of the final RDD in this stage is partitioned and persisted, as shown in the following diagram:
In the diagram there're 4ShuffleMapTask
s to execute in the same worker node with 2 cores. The task result (records of the final RDD in the stage) is written on the local disk (data persistence). Each task hasR
buffers,R
equals the number of reducers (the number if tasks in the next stage). The buffers are called buckets in Spark. By default the size of each bucket is 32KB (100KB before Spark 1.1) and is configurable byspark.shuffle.file.buffer.kb
.
In fact bucket is a general concept in Spark that represents the location of the partitioned output of a
ShuffleMapTask
. Here for simplicity a bucket is referred to an in-memory buffer.
ShuffleMapTask
employs the pipelining techinque to compute the result records of the final RDD. Each record is sent to the bucket of its corresponding partition, which is determined bypartitioner.partition(record.getKey())
. The content of these buckets is written continuously to local disk files calledShuffleBlockFile
, orFileSegment
for short. Reducers will fetch theirFileSegment
in shuffle read phase.
An implementation like this is very simple, but has some issues:
- We may produce too many
FileSegment
. EachShuffleMapTask
producesR
(number of reducers)FileSegment
, soM
ShuffleMapTask
will produceM * R
files. For big datasets we could have bigM
andR
, as a result there may be lots of intermediate data files. - Buffers could take a lot of space.
On a worker node, we could have
R * M
buckets for each core available to Spark. Spark will reuse the buffer space after aShuffleMapTask
but there could still beR * cores
buckets in memory. On a node with 8 cores processing a 1000-reducer job, buckets will take up 256MB (R * cores * 32KB
).
Currently, there's no good solution to the second problem. We need to write buffers anyway and if they're too small there will be impact on IO speed. For the first problem, we have a file consolidation solution already implemented in Spark. Let's check it out:
It's clear that from the above diagram, consecutiveShuffleMapTask
s running on the same core share a shuffle file. Each task appends its output data,ShuffleBlock
i', after the output data of the previous task,ShuffleBlock
i. AShuffleBlock
is called aFileSegment
. In this way, reducers in the next stage can just fetch the whole file and we reduce the number of files needed in each worker node tocores * R
. File consolidation feature can be activated by settingspark.shuffle.consolidateFiles
to true.
Shuffle Read
Let's check a physical plan ofreduceBykey
, which containsShuffleDependency
:
Intuitively, we need to fetch the data ofMapPartitionRDD
to be able to evaluateShuffleRDD
. Then come the problems:
- When to fetch? Fetch for each
ShuffleMapTask
or fetch only once after allShuffleMapTask
s are done? - Fetch and process the records at the same time or fetch and then process?
- Where to store the fetched data?
- How do the tasks of the next stage know the location of the fetched data?
Solutions in Spark:
When to fetch?Wait after all
ShuffleMapTask
s end and then fetch. We know that a stage will be executed only after its parent stages are executed, so it's intuitive that the fetch operation begins after allShuffleMapTask
s in the previous stage are done. The fetchedFileSegments
have to be buffered in memory, so we can't fetch too much before the buffer content is written to disk. Spark limits this buffer size byspark.reducer.maxMbInFlight
, here we name itsoftBuffer
. It has default size 48MB. AsoftBuffer
usually contains multiple fetchedFileSegments
. But sometimes one single segment can fill up the buffer.Fetch and process the records at the same time or fetch and then process?Fetch and process the records at the same time. In MapReduce, the shuffle stage fetches the data and then applies
combine()
logic at the same time. However in MapReduce the reducer input data needs to be sorted, so thereduce()
logic is applied after the shuffle-sort process. Since Spark does not require a sorted order for the reducer input data, we don't need to wait until all the data gets fetched to start processing.Then how Spark implements this shuffle and processing?In fact Spark utilizes data structures like HashMap to do the job. Each <Key, Value> pair from the shuffle process is inserted into a HashMap. If theKey
is already present, then the pair is aggregated byfunc(hashMap.get(Key), Value)
. In the above WordCount example, thefunc
ishashMap.get(Key) + Value
, and its result is updated in the HashMap. Thisfunc
has a similar role toreduce()
in Hadoop, but they differ in details. We illustrate the difference by the following code snippet:``` // MapReduce
reduce( K key, Iterable < V > values) {
result
= process(key, values)
return result }
// Spark
reduce(
K
key,
Iterable
<
V
>
values) {
result
=
null
for
(
V
value
:
values)
result
=
func(result, value)
return result } ```
In Hadoop MapReduce, we can define any data structure we like inprocess
function. It's just a function that takes anIterable
as parameter. We can also choose to cache thevalues
for further processing. In Spark, afoldLeft
like technique is used to apply thefunc
. For example, in Hadoop, it's very easy to compute the average out ofvalues
:sum(values) / values.length
. But it's not the case in the Spark model. We'll come back to this part later.
Where to store the fetched data?The fetched
FileSegment
s get buffered insoftBuffer
. Then the data is processed, and written to a configurable location. Ifspark.shuffle.spill
is false, then the write location is only memory. A special data structure,AppendOnlyMap
, is used to hold these processed data in memory. Otherwise, the processed data will be written to memory and disk, usingExternalAppendOnlyMap
. This data structure can spill the sorted key-value pairs on disk when there isn't enough memory available.A key problem in using both memory and disk is how to find a balance of the two.In Hadoop, by default 70% of the memory is reserved for shuffle data. Once 66% of this part of the memory is used, Hadoop starts the merge-combine-spill process. In Spark a similar strategy is used. We'll talk about its details later in this chapter.How do the tasks of the next stage know the location of the fetched data?Recall that in the last chapter, there's an important step:
ShuffleMapStage
, which will register its final RDD by callingMapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
. So during the shuffle process, reducers get the data location by queryingMapOutputTrackerMaster
in the driver process. When aShuffleMapTask
finishes, it will report the location of itsFileSegment
toMapOutputTrackerMaster
.
Now we have discussed the main ideas behind shuffle write and shuffle read as well as some implementation details. Let's dive into some interesting details.
Shuffle Read of Typical Transformations
reduceByKey(func)
We have briefly talked about the fetch and reduce process ofreduceByKey()
. Note that for an RDD, not all its data is present in the memory at a given time. The processing is always on a record basis. Processed record is rejected if possible. On a record level perspective, thereduce()
logic can be shown as below:
We can see that the fetched records are aggregated using a HashMap, and once all the records are aggregated, we will have the result. Thefunc
needs to be commutative.
AmapPartitionsWithContext
operation is used to transform theShuffledRDD
to aMapPartitionsRDD
.
To reduce network trafic between nodes, we could use map sidecombine()
in Hadoop. It's also feasible in Spark. All we need is to apply themapPartitionsWithContext
in theShuffleMapStage
. For example inreduceByKey
, the transformation ofParallelCollectionRDD
toMapPartitionsRDD
is equivalent to a map side combine.
Comparison between map()->reduce() in Hadoop andreduceByKey
in Spark
- map side: there's no difference on the map side. For
combine()
logic, Hadoop imposes a sort beforecombine()
. Spark applies thecombine()
logic by using a hash map. - reduce side: Shuffle process in Hadoop will fetch the data until a certain amount, then applies
combine()
logic, then merge sort the data to feed thereduce()
function. In Spark fetch and reduce is done at the same time (in a hash map), so the reduce function needs to be commutative.
Comparison in terms of memory usage
- map side: Hadoop needs a big, circular buffer to hold and sort the
map()
output data. Butcombine()
does not need extra space. Spark needs a hash map to docombine()
. And persisting records to local disk needs buffers (buckets). - reduce side: Hadoop needs some memory space to store shuffled data.
combine()
andreduce()
require no extra space since their input is sorted and can be grouped and then aggregated. In Spark, asoftBuffer
is needed for fetching. A hash map is used for storing the result ofcombine()
andreduce()
, if only memory is used in processing data. However, part of the data can be stored on disk if configured to use both memory and disk.
groupByKey(numPartitions)
The process is similar to that ofreduceByKey()
. Thefunc
becomesresult = result ++ result.value
. This means that each key's values are grouped together without further aggregation.
distinct(numPartitions)
Similar toreduceByKey()
. Thefunc
isresult = result == null ? record.value : result
. This means that we check the existence of the record in theHashMap
. If it exists, reject the record, otherwise insert it into the map. LikereduceByKey()
, there's map sidecombine()
.
cogroup(otherRDD, numPartitions)
There could be 0, 1 or multipleShuffleDependency
for aCoGroupedRDD
. But in the shuffle process we don't create a hash map for each shuffle dependency, but one hash map for all of them. Different fromreduceByKey
, the hash map is constructed in RDD'scompute()
rather than inmapPartitionsWithContext()
.
A task of this RDD's execution will allocate anArray[ArrayBuffer]
. This array contains the same number of emptyArrayBuffer
s as the number of input RDDs. So in the example we have 2ArrayBuffers
in each task. When a key-value pair comes from RDD A, we add it to the firstArrayBuffer
. If a key-value pair comes from RDD B, then it goes to the secondArrayBuffer
. Finally amapValues()
operation transforms the values into the correct type:(ArrayBuffer, ArrayBuffer)
=>(Iterable[V], Iterable[W])
.
intersection(otherRDD)
andjoin(otherRDD, numPartitions)
This two operations both usecogroup
, so their shuffle process is identical tocogroup
.
sortByKey(ascending, numPartition)
The processing logic ofsortByKey()
is a little different fromreduceByKey()
as it does not use aHashMap
to handle incoming fetched records. Instead, all key-value pairs are range partitioned. The records of the same partition is sorted by key.
coalesce(numPartitions, shuffle = true)
coalesce()
would create aShuffleDependency
, but it actually does not need to aggregate the fetched records, so no hash map is needed.
HashMap in Shuffle Read
So as we have seen, hash map is a frequently used data structure in Spark's shuffle process. Spark has 2 versions of specialized hash map: in memoryAppendOnlyMap
and memory-disk hybridExternalAppendOnlyMap
. Let's look at some details of these two hash map implementations.
AppendOnlyMap
The Spark documentation describesAppendOnlyMap
as "A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed". Its implementation is simple: allocate a big array ofObject
, as the following diagram shows. Keys are stored in the blue sections, and values are in the white sections.
When aput(K, V)
is issued, we locate the slot in the array byhash(K)
.If the position is already occupied, then quadratic probing technique is used to find the next slot.. For the example in the diagram,K6
, a third probing has found an empty slot afterK4
, then the value is inserted after the key. Whenget(K6)
, we use the same technique to find the slot, getV6
from the next slot, compute a new value, then write it to the position ofV6
.
Iteration over theAppendOnlyMap
is just a scan of the array.
If 70% of the allocated array is used, then it will grow twice as large. Keys will be rehashed and the positions re-organized.
There's adestructiveSortedIterator(): Iterator[(K, V)]
method inAppendOnlyMap
. It returns sorted key-value pairs. It's implemented like this: first compact all key-value pairs to the front of the array and make each key-value pair in a single slot. ThenArray.sort()
is called to sort the array. As its name indicates, this operation will destroy the structure.
ExternalAppendOnlyMap
Compared withAppendOnlyMap
, the implementation ofExternalAppendOnlyMap
is more sophisticated. Its concept is similar to theshuffle-merge-combine-sort
process in Hadoop.
ExternalAppendOnlyMap
holds anAppendOnlyMap
. Incoming key-value pairs are inserted into theAppendOnlyMap
.WhenAppendOnlyMap
is about to grow its size, we'll check the available memory space. If there's still enough space, theAppendOnlyMap
doubles its size, otherwise all its key-value pairs are sorted and then spilled onto local disk (by usingdestructiveSortedIterator()
).In the diagram, there're 4 spills of this map. In each spill, aspillMap
file will be generated and a new, emptyAppendOnlyMap
will be instantiated to receive incoming key-value pairs. InExternalAppendOnlyMap
, when a key-value pair is inserted, it gets aggregated only with the in memory part (theAppendOnlyMap
). So it means when asked for the final result, a global merge-aggregate needs to be performed on all spilled maps and the in memoryAppendOnlyMap
.
Global merge-aggregate runs as follows.Firstly the in memory part (AppendOnlyMap
) is sorted to asortedMap
. ThenDestructiveSortedIterator
(forsortedMap
) orDiskMapIterator
(for on diskspillMap
) will be used to read a part of the key-value pairs into aStreamBuffer
. Then theStreamBuffer
is inserted into amergeHeap
. In eachStreamBuffer
, all records have the samehash(key)
. Suppose that in the example, we havehash(K1) == hash(K2) == hash(K3) < hash(K4) < hash(K5)
. As a result, the first 3 records of the first spilled map are read into the sameStreamBuffer
. The merge is simple: getStreamBuffer
s with the same key hash using a heap, then put them into anArrayBuffer[StreamBuffer]
(mergedBuffers
) for merge. The first insertedStreamBuffer
is calledminBuffer
, the key of its first key-value pair isminKey
. One merge operation will aggregate all KV pairs withminKey
in themergedBuffer
and then output the result. When a merge operation inmergedBuffer
is over, remaining KV pairs will return to themergeHeap
, and emptyStreamBuffer
will be replaced by a new read from in-memory map or on-disk spill.
There're still 3 points needed to be discussed:
Available memory check. Hadoop allocates 70% of the memory space of a reducer for shuffle-sort. Similarly, Spark has
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
(defaults to 0.3 * 0.8) forExternalAppendOnlyMap
.It seems that Spark is more conservative. Moreover, this 24% of memory space is shared by all reducers in the same executor.An executor holds aShuffleMemoryMap: HashMap[threadId, occupiedMemory]
to monitor memory usage of allExternalAppendOnlyMap
s in each reducer. Before anAppendOnlyMap
grows, the total memory usage after the growth will be computed using the information inShuffleMemoryrMap
, to see if there's enough space. Also notice that the first 1000 records will not trigger the spill check.AppendOnlyMap
size estimation. To know the size of anAppendOnlyMap
, we can compute the size of every object referenced in the structure during each growth. But this takes too much time. Spark has an estimation algorithm with O(1) complexity. Its core concept is to see how the map size changes after the insertion and aggregation of a certain amount of records to estimate the structure size. Details are inSizeTrackingAppendOnlyMap
andSizeEstimator
.Spill process. Like the shuffle write, Spark creates a buffer when spilling records to disk. Its size is
spark.shuffle.file.buffer.kb
, defaulting to 32KB. Since the serializer also allocates buffers to do its job, there'll be problems when we try to spill lots of records at the same time. Spark limits the records number that can be spilled at the same time tospark.shuffle.spill.batchSize
, with a default value of 10000.
Discussion
As we've seen in this chapter, Spark is way more flexible in the shuffle process compared to Hadoop's fixed shuffle-combine-merge-reduce model. It's possible in Spark to combine different shuffle strategies with different data structures to design an appropriate shuffle process based on the semantic of the actual transformation.
So far we've discussed the shuffle process in Spark without sorting as well as how this process gets integrated into the actual execution of the RDD chain. We've also talked about memory and disk issues and compared some details with Hadoop. In the next chapter we'll try to describe job execution from an inter-process communication perspective. The shuffle data location problem will also be mentioned.
Plus to this chapter, thers's the outstanding blog (in Chinese) by Jerry Shao,Deep Dive into Spark's shuffle implementation.